package com.bawei.persona.realtime.app.test;


import com.bawei.persona.realtime.bean.TableProcess;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

/**
 * 上海大数据学院
 * 项目规划及管理：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 */
public class JdbcDemo {

    /*
      分装方法： 将设置配置好的东西取出来，将来按着这个配置将数据进行分流
     */
    public static List<TableProcess> jdbcRead(ExecutionEnvironment env) throws Exception {
        DataSet<Row> rowDataSet = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                //配置数据库连接信息
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://hadoop101:3306/gmall2021")
                .setUsername("root")
                .setPassword("000000")
                .setQuery("select * from table_process")
                //设置查询的列的类型，根据实际情况定
                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
                .finish());
        List<Row> list = rowDataSet.collect();
        List<TableProcess> listNew = new ArrayList<TableProcess>();
        for (int i = 0; i <  list.size(); i++) {
            Row row = list.get(i);
            String source_table = row.getField(0).toString();
            String operate_type = row.getField(1).toString();
            String sink_type = row.getField(2).toString();
            String sink_table = row.getField(3).toString();
            String sink_columns = row.getField(4).toString();
            String sink_pk = row.getField(5)==null ?"" :row.getField(5).toString();
            String sink_extend = row.getField(6)==null ?"":row.getField(6) .toString();
            TableProcess tableProcess = new TableProcess() ;
            tableProcess.setSourceTable(source_table);
            tableProcess.setOperateType(operate_type);
            tableProcess.setSinkType(sink_type);
            tableProcess.setSinkTable(sink_table);
            tableProcess.setSinkColumns(sink_columns);
            tableProcess.setSinkPk(sink_pk);
            tableProcess.setSinkExtend(sink_extend);
            listNew.add(tableProcess) ;
        }
        return  listNew;
    }

}
